package ins.framework.kafka;

import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

import java.util.*;
import java.util.stream.Collectors;

/**
 * Hello world!
 *
 */
public class App
{
    private static final Logger logger = LoggerFactory.getLogger(App.class);
    private static final String ZOOKEEPER_ARGS_KEY="-zk_addr";
    private static final String TOPIC_TIME_KEY="-tp_time";//topic列表获取间隔
    private static final String Thread_Nums_KEY="-threads";

    private static List<String> runTopics = new ArrayList<>();
    private static List<Thread> threads = new ArrayList<>();




    public static void main( String[] args )
    {
        long topicFlushTime = 5*1000L;//获取topic列表间隔
        int threadNums = 1;//消费topic的线程数，即几个消费者
        String zk = "10.10.1.7:2181";
        int sessionTimeOut = 30000;
        int connectTimeOut = 30000;
        //brokers地址
        String broker = "10.10.1.8:9092";
//        zk = "10.10.68.200:2181";
        broker = "10.10.68.200:9092";
       // broker = "zk1:9092";
        //消费group
        String group = "66";
//        String zookeeper = args[0];
        runTopics.addAll(getTopicList(zk,sessionTimeOut,connectTimeOut));
        //另启线程消费topic中的消息
        consumeMsg(threadNums,runTopics,broker,group);

        threads.add(Thread.currentThread());
  /*      //定期去获取topic列表
        List<String> newTopics;
        while (!Thread.currentThread().isInterrupted()){
            try{
                Thread.sleep(topicFlushTime);
            }catch (Exception e){
                logger.debug("准备去获取topic列表");
            }

            newTopics = getNewAddTopics(zk,sessionTimeOut,connectTimeOut);
            if(!newTopics.isEmpty()){
                consumeMsg(threadNums,newTopics,broker,group);
            }
        }*/
    }
    public static void consumeMsg(int threadNums,List<String> subTopics,String broker,String group){

        for (int i = 0; i < threadNums; i++) {
            ConsumePartions msgReceiver = new ConsumePartions(subTopics,getConsumerConfig(group,broker),i);
            Thread thread = new Thread(msgReceiver);
            threads.add(thread);
            thread.setName("consumer " + threads.size());
            thread.start();
        }
    }
    public static List<String> getTopicList(String zk,int sessionTimeOut,int connectTimeOut) {
       // ZkUtils zkUtils = ZkUtils.apply(zk, sessionTimeOut, connectTimeOut, JaasUtils.isZkSecurityEnabled());
        //List<String> allTopicList = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
        List<String> allTopicList = new ArrayList<String>();
        allTopicList.clear();
//        allTopicList.add("postgres.test0802");
     //   allTopicList.add("aaaaat");
//        allTopicList.add("postgres.a_test");
//      allTopicList.add("postgres.test0814");
      allTopicList.add("test6");
//      allTopicList.add("test");
      // allTopicList.add("integer.test");
         return allTopicList;
        //return allTopicList.stream().filter(topic -> topic.startsWith("postgres")).collect(Collectors.toList());
    }

    //获取新增的topic列表
    public static List<String> getNewAddTopics(String zk,int sessionTimeOut,int connectTimeOut){
        List<String> allTopicListNew = new ArrayList<>();
        List<String> allTopicList = getTopicList(zk,sessionTimeOut,connectTimeOut);
        for(String topic:allTopicList){
            if(!runTopics.contains(topic)){
                System.out.println("新增topic"+topic);
                allTopicListNew.add(topic);
            }
        }
        return allTopicListNew;
    }

    public static void destroy(){
        System.out.println("运行线程数"+threads.size());
        for(Thread t:threads){
            t.interrupt();
        }
    }

    //kafka consumer配置
    private static Map<String, Object> getConsumerConfig(String group,String broker) {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put("bootstrap.servers", broker);
      //  configMap.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, broker);
        configMap.put("group.id", group);
        configMap.put("enable.auto.commit", "false");
        configMap.put("auto.offset.reset", "earliest");
//        configMap.put("security.protocol", "SASL_PLAINTEXT");
//        configMap.put("sasl.mechanism", "PLAIN");
//        System.setProperty("java.security.auth.login.config",
//                "E:test\\kafka_client_jaas.conf"); //配置文件的路径
        configMap.put("session.timeout.ms", "30000");
        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put("max.poll.records", 100);
        return configMap;

    }
}
